package net.vipmro.search.importer.service.b2r;

import com.alibaba.fastjson.JSONObject;
import net.vipmro.search.dto.entity.GoodsAttr;
import net.vipmro.search.dto.entity.SellerGoodsInfo;
import net.vipmro.search.dto.pojo.DataPacket;
import net.vipmro.search.dto.pojo.TableData;
import net.vipmro.search.elasticsearch.ElasticsearchHandler;
import net.vipmro.search.importer.common.BaseService;
import net.vipmro.search.importer.dao.SellerGoodsInfoDAO;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static net.vipmro.search.core.constants.PageConstant.SIZE_5000;

/**
 * @author fengxiangyang
 * @date 2018/12/4
 */
@Service
public class SellerGoodsInfoServiceImpl extends BaseService implements SellerGoodsInfoService {
    private static final Logger logger = LoggerFactory.getLogger(SellerGoodsInfoServiceImpl.class);
    /**
     * 索引名称
     */
    @Value("${elasticsearch.index-name.b2r-goods}")
    private String indexName;

    @Resource
    private SellerGoodsInfoDAO sellerGoodsInfoDAO;
    @Resource
    private ElasticsearchHandler elasticsearchHandler;
    /**
     * 全量更新
     */
    @Override
    public void reindex(){
        long id = 0L;
        int count = 0;
        while (true) {
            logger.info("b2r insert start {}", id);
            final List<SellerGoodsInfo> list = sellerGoodsInfoDAO.findPage(id, SIZE_5000);
             if (list.isEmpty()) {
                break;
            }
            id = list.stream().map(o -> o.getId()).reduce(Long::max).get();
            importReindexData(list);
            count += list.size();
            logger.info("b2r insert end id>>>{} mq.size>>>{}", id, count);
        }
        logger.info("b2r初始化数据生产完成，count={}", count);
    }

    /**
     * 增量更新
     */
    @Override
    public void update(DataPacket dataPacket){
        final List<TableData> dataList = dataPacket.getDataList();
        //使用Set去重
        Set<String> deleteIds = new HashSet<>();
        Set<String> deleteGoodsIds = new HashSet<>();
        Set<String> importIds = new HashSet<>();
        Set<String> importGoodsIds = new HashSet<>();
        for (TableData tableData: dataList){
            final String table = tableData.getTable();
            final String type = tableData.getType();
            final Map<String, Object> data = tableData.getData();
            if("seller_goods".equals(table)) {
                final String id = String.valueOf(data.get("id"));
                if("delete".equals(type) ){
                    deleteIds.add(id);
                }else{
                    final String status = String.valueOf(data.get("status"));
                    final String isShow = String.valueOf(data.get("is_show"));
                    final String channel = String.valueOf(data.get("channel"));
                    if("1".equals(status) && "1".equals(isShow) && "1".equals(channel)) {
                        importIds.add(id);
                    }else{
                        deleteIds.add(id);
                    }
                }
            }else if("goods".equals(table)){
                final String id = String.valueOf(data.get("id"));
                if("delete".equals(type) ){
                    deleteGoodsIds.add(id);
                }else{
                    final String status = String.valueOf(data.get("status"));
                    if("1".equals(status)) {
                        importGoodsIds.add(id);
                    }else {
                        deleteGoodsIds.add(id);
                    }
                }
            } else if("brand".equals(table)){
                clearCacheBrand();
            } else if("category".equals(table)){
                clearCacheCategory();
            } else if("goods_attr".equals(table)){
                clearCacheGoodsAttr(String.valueOf(data.get("goods_id")));
            }
        }

        if(!deleteGoodsIds.isEmpty()){
            final String goodsIds = deleteGoodsIds.stream().collect(Collectors.joining(","));
            final List<String> ids = sellerGoodsInfoDAO.findByGoodsIds(goodsIds);
            deleteIds.addAll(ids);
        }

        if(!importGoodsIds.isEmpty()){
            final String goodsIds = importGoodsIds.stream().collect(Collectors.joining(","));
            final List<String> ids = sellerGoodsInfoDAO.findByGoodsIds(goodsIds);
            importIds.addAll(ids);
        }

        //数据删除
        if(!deleteIds.isEmpty()){
            deleteData(deleteGoodsIds);
        }
        //数据导入
        if(!importIds.isEmpty()){
            final String ids = importIds.stream().collect(Collectors.joining(","));
            final List<SellerGoodsInfo> list = sellerGoodsInfoDAO.findByIds(ids);
            importUpdateData(list);
        }
    }

    /**索引数据删除
     * @param set
     */
    private void deleteData(Set<String> set){
        BulkRequest bulkRequest = new BulkRequest();
        for (String id : set) {
            DeleteRequest deleteRequest = new DeleteRequest(indexName,"_doc",id);
            bulkRequest.add(deleteRequest);
        }
        logger.info("索引数据删除:bulk={}",JSONObject.toJSONString(bulkRequest));
        elasticsearchHandler.bulk(bulkRequest);
    }

    private void importReindexData(List<SellerGoodsInfo> list){
        Set<String> set = new HashSet<>();
        list.forEach(o->set.add(o.getGoodsId().toString()));
        final Map<Long, List<GoodsAttr>> goodsAttrMap = getGoodsAttrMap(set);

        BulkRequest bulkRequest = new BulkRequest();
        for (SellerGoodsInfo goodsInfo : list){
            final Map<String, Object> map = getSellerGoodsInfoMap(goodsInfo);
            final List<GoodsAttr> goodsAttrs = goodsAttrMap.get(goodsInfo.getGoodsId());
            if(goodsAttrs != null && !goodsAttrs.isEmpty()){
                map.put("attrId", goodsAttrs.stream().map(GoodsAttr::getAttrId).collect(Collectors.toList()));
                map.put("attrValueId", goodsAttrs.stream().map(GoodsAttr::getAttrValueId).collect(Collectors.toList()));
            }
            final Long id = goodsInfo.getId();
            IndexRequest indexRequest = new IndexRequest(indexName,"_doc", id.toString());
            indexRequest.source(map);
            bulkRequest.add(indexRequest);
        }
        elasticsearchHandler.bulk(bulkRequest);
    }

    /**
     * 索引数据导入
     * @param list
     */
    private void importUpdateData(List<SellerGoodsInfo> list){
        BulkRequest bulkRequest = new BulkRequest();
        for (SellerGoodsInfo goodsInfo : list){
            final Map<String, Object> map = getSellerGoodsInfoMap(goodsInfo);
            final List<GoodsAttr> goodsAttrs = getGoodsAttrs(goodsInfo.getGoodsId());
            if(goodsAttrs != null && !goodsAttrs.isEmpty()){
                map.put("attrId", goodsAttrs.stream().map(GoodsAttr::getAttrId).collect(Collectors.toList()));
                map.put("attrValueId", goodsAttrs.stream().map(GoodsAttr::getAttrValueId).collect(Collectors.toList()));
            }
            final Long id = goodsInfo.getId();
            IndexRequest indexRequest = new IndexRequest(indexName,"_doc", id.toString());
            indexRequest.source(map);
            bulkRequest.add(indexRequest);
        }
        logger.info("索引数据导入:bulk={}",JSONObject.toJSONString(bulkRequest));
        elasticsearchHandler.bulk(bulkRequest);
    }

}
